[[kafka-listener-annotation]]
= `@KafkaListener` Annotation

The `@KafkaListener` annotation is used to designate a bean method as a listener for a listener container.
The bean is wrapped in a `MessagingMessageListenerAdapter` configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

You can configure most attributes on the annotation with SpEL by using `#{...}` or property placeholders (`${...}`).
See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html[Javadoc] for more information.

[[record-listener]]
== Record Listeners

The `@KafkaListener` annotation provides a mechanism for simple POJO listeners.
The following example shows how to use it:

[source, java]
----
public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}
----

This mechanism requires an `@EnableKafka` annotation on one of your `@Configuration` classes and a listener container factory, which is used to configure the underlying `ConcurrentMessageListenerContainer`.
By default, a bean with name `kafkaListenerContainerFactory` is expected.
The following example shows how to use `ConcurrentMessageListenerContainer`:

[source, java]
----
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ...
        return props;
    }
}
----

Notice that, to set container properties, you must use the `getContainerProperties()` method on the factory.
It is used as a template for the actual properties injected into the container.

Starting with version 2.1.1, you can now set the `client.id` property for consumers created by the annotation.
The `clientIdPrefix` is suffixed with `-n`, where `n` is an integer representing the container number when using concurrency.

Starting with version 2.2, you can now override the container factory's `concurrency` and `autoStartup` properties by using properties on the annotation itself.
The properties can be simple values, property placeholders, or SpEL expressions.
The following example shows how to do so:

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}
----

[[manual-assignment]]
== Explicit Partition Assignment

You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets).
The following example shows how to do so:

[source, java]
----
@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
----

You can specify each partition in the `partitions` or `partitionOffsets` attribute but not both.

As with most annotation properties, you can use SpEL expressions; for an example of how to generate a large list of partitions, see xref:tips.adoc[Manually Assigning All Partitions].

Starting with version 2.5.5, you can apply an initial offset to all assigned partitions:

[source, java]
----
@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
----

The `*` wildcard represents all partitions in the `partitions` attribute.
There must only be one `@PartitionOffset` with the wildcard in each `@TopicPartition`.

In addition, when the listener implements `ConsumerSeekAware`, `onPartitionsAssigned` is now called, even when using manual assignment.
This allows, for example, any arbitrary seek operations at that time.

Starting with version 2.6.4, you can specify a comma-delimited list of partitions, or partition ranges:

[source, java]
----
@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}
----

The range is inclusive; the example above will assign partitions `0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15`.

The same technique can be used when specifying initial offsets:

[source, java]
----
@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
----

The initial offset will be applied to all 6 partitions.

Since 3.2, `@PartitionOffset` support `SeekPosition.END`, `SeekPosition.BEGINNING`, `SeekPosition.TIMESTAMP`, `seekPosition` match `SeekPosition` enum name:

[source, java]
----
@KafkaListener(id = "seekPositionTime", topicPartitions = {
        @TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
                @PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
                @PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
        })
})
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
----

If seekPosition set `END` or `BEGINNING` will ignore `initialOffset` and `relativeToCurrent`.
If seekPosition set `TIMESTAMP`, `initialOffset` means timestamp.

[[manual-acknowledgment]]
== Manual Acknowledgment

When using manual `AckMode`, you can also provide the listener with the `Acknowledgment`.
The following example also shows how to use a different container factory.

[source, java]
----
@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
----

[[consumer-record-metadata]]
== Consumer Record Metadata

Finally, metadata about the record is available from message headers.
You can use the following header names to retrieve the headers of the message:

* `KafkaHeaders.OFFSET`
* `KafkaHeaders.RECEIVED_KEY`
* `KafkaHeaders.RECEIVED_TOPIC`
* `KafkaHeaders.RECEIVED_PARTITION`
* `KafkaHeaders.RECEIVED_TIMESTAMP`
* `KafkaHeaders.TIMESTAMP_TYPE`

Starting with version 2.5 the `RECEIVED_KEY` is not present if the incoming record has a `null` key; previously the header was populated with a `null` value.
This change is to make the framework consistent with `spring-messaging` conventions where `null` valued headers are not present.

The following example shows how to use the headers:

[source, java]
----
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
----

IMPORTANT: Parameter annotations (`@Payload`, `@Header`) must be specified on the concrete implementation of the listener method; they will not be detected if they are defined on an interface.

Starting with version 2.5, instead of using discrete headers, you can receive record metadata in a `ConsumerRecordMetadata` parameter.

[source, java]
----
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}
----

This contains all the data from the `ConsumerRecord` except the key and value.

[[batch-listeners]]
== Batch Listeners

Starting with version 1.1, you can configure `@KafkaListener` methods to receive the entire batch of consumer records received from the consumer poll.

IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] are not supported with batch listeners.

To configure the listener container factory to create batch listeners, you can set the `batchListener` property.
The following example shows how to do so:

[source, java]
----
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<  
   return factory;
}
----

NOTE: Starting with version 2.8, you can override the factory's `batchListener` propery using the `batch` property on the `@KafkaListener` annotation.
This, together with the changes to xref:kafka/annotation-error-handling.adoc#error-handlers[Container Error Handlers] allows the same factory to be used for both record and batch listeners.

NOTE: Starting with version 2.9.6, the container factory has separate setters for the `recordMessageConverter` and `batchMessageConverter` properties.
Previously, there was only one property `messageConverter` which applied to both record and batch listeners.

The following example shows how to receive a list of payloads:

[source, java]
----
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}
----

The topic, partition, offset, and so on are available in headers that parallel the payloads.
The following example shows how to use the headers:

[source, java]
----
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}
----

Alternatively, you can receive a `List` of `Message<?>` objects with each offset and other details in each message, but it must be the only parameter (aside from optional `Acknowledgment`, when using manual commits, and/or `Consumer<?, ?>` parameters) defined on the method.
The following example shows how to do so:

[source, java]
----
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}
----

No conversion is performed on the payloads in this case.

If the `BatchMessagingMessageConverter` is configured with a `RecordMessageConverter`, you can also add a generic type to the `Message` parameter and the payloads are converted.
See xref:kafka/serdes.adoc#payload-conversion-with-batch[Payload Conversion with Batch Listeners] for more information.

You can also receive a list of `ConsumerRecord<?, ?>` objects, but it must be the only parameter (aside from optional `Acknowledgment`, when using manual commits and `Consumer<?, ?>` parameters) defined on the method.
The following example shows how to do so:

[source, java]
----
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}
----

Starting with version 2.2, the listener can receive the complete `ConsumerRecords<?, ?>` object returned by the `poll()` method, letting the listener access additional methods, such as `partitions()` (which returns the `TopicPartition` instances in the list) and `records(TopicPartition)` (which gets selective records).
Again, this must be the only parameter (aside from optional `Acknowledgment`, when using manual commits or `Consumer<?, ?>` parameters) on the method.
The following example shows how to do so:

[source, java]
----
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
----

IMPORTANT: If the container factory has a `RecordFilterStrategy` configured, it is ignored for `ConsumerRecords<?, ?>` listeners, with a `WARN` log message emitted.
Records can only be filtered with a batch listener if the `<List<?>>` form of listener is used.
By default, records are filtered one-at-a-time; starting with version 2.8, you can override `filterBatch` to filter the entire batch in one call.

[[annotation-properties]]
== Annotation Properties

Starting with version 2.0, the `id` property (if present) is used as the Kafka consumer `group.id` property, overriding the configured property in the consumer factory, if present.
You can also set `groupId` explicitly or set `idIsGroup` to false to restore the previous behavior of using the consumer factory `group.id`.

You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:

[source, java]
----
@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")
----

Starting with version 2.1.2, the SpEL expressions support a special token: `__listener`.
It is a pseudo bean name that represents the current bean instance within which this annotation exists.

Consider the following example:

[source, java]
----
@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}
----

Given the beans in the previous example, we can then use the following:

[source, java]
----
public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}
----

If, in the unlikely event that you have an actual bean called `__listener`, you can change the expression token byusing the `beanRef` attribute.
The following example shows how to do so:

[source, java]
----
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
----

Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You **cannot** specify the `group.id` and `client.id` properties this way; they will be ignored; use the `groupId` and `clientIdPrefix` annotation properties for those.

The properties are specified as individual strings with the normal Java `Properties` file format: `foo:bar`, `foo=bar`, or `foo bar`, as the following example shows:

[source, java]
----
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
----

The following is an example of the corresponding listeners for the example in xref:kafka/sending-messages.adoc#routing-template[Using `RoutingKafkaTemplate`].

[source, java]
----
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}
----


